package org.zjvis.datascience.common.etl;

import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.google.common.base.Joiner;
import com.google.common.collect.Lists;
import io.netty.util.internal.StringUtil;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.zjvis.datascience.common.constant.SqlTemplate;
import org.zjvis.datascience.common.enums.ETLEnum;
import org.zjvis.datascience.common.enums.PythonDateTypeFormatEnum;
import org.zjvis.datascience.common.enums.SubTypeEnum;
import org.zjvis.datascience.common.model.CompositeConfig;
import org.zjvis.datascience.common.model.ConfigComponent;
import org.zjvis.datascience.common.model.DimensionConfig;
import org.zjvis.datascience.common.sql.SqlHelper;
import org.zjvis.datascience.common.util.DatasetUtil;
import org.zjvis.datascience.common.util.SqlUtil;
import org.zjvis.datascience.common.util.ToolUtil;
import org.zjvis.datascience.common.vo.TaskVO;

import java.util.ArrayList;
import java.util.List;
import java.util.Set;


/**
 * @description ETL-Filter筛选器类
 * @date 2021-12-27
 */
public class Filter extends BaseETL {

    private static String FILTER_SQL = "CREATE VIEW %s AS SELECT %s FROM %s";

    private List<String> columnSelect;

    private List<ConfigComponent> rowSelect;

    public Filter() {
        super(ETLEnum.FILTER.name(), SubTypeEnum.ETL_OPERATE.getVal(),
                SubTypeEnum.ETL_OPERATE.getDesc());
        this.maxParentNumber = 1;
    }


    public void parserConf(JSONObject conf) {
        if (conf.getJSONArray("columnsFilter") == null || conf.getJSONArray("rowsFilter") == null) {
            return;
        }
        columnSelect = conf.getJSONArray("columnsFilter").toJavaList(String.class);

        rowSelect = Lists.newArrayList();

        JSONArray filterArray = conf.getJSONArray("rowsFilter");
        if (filterArray != null) {
            for (int i = 0; i < filterArray.size(); i++) {
                JSONArray subFilterArray = (JSONArray) filterArray.get(i);
                ConfigComponent component = null;
                if (subFilterArray.size() > 1) {
                    CompositeConfig composite = new CompositeConfig();
                    composite.setType("AND");
                    for (int j = 0; j < subFilterArray.size(); j++) {
                        JSONObject jsonObj = subFilterArray.getJSONObject(j);
                        DimensionConfig dc = new DimensionConfig();
                        dc.setFieldName(
                                SqlHelper.stringEscapeHelper(jsonObj.getString("fieldName")));
                        dc.setFilterType(jsonObj.getString("condition"));
                        dc.setValues(Lists.newArrayList(
                                SqlHelper.stringEscapeHelper(jsonObj.getString("value"))));
                        composite.add(dc);
                    }
                    component = composite;
                } else {
                    if (subFilterArray.size() > 0) {
                        JSONObject jsonObj = subFilterArray.getJSONObject(0);
                        DimensionConfig dc = new DimensionConfig();
                        dc.setFieldName(
                                SqlHelper.stringEscapeHelper(jsonObj.getString("fieldName")));
                        dc.setFilterType(jsonObj.getString("condition"));
                        dc.setValues(Lists.newArrayList(
                                SqlHelper.stringEscapeHelper(jsonObj.getString("value"))));
                        component = dc;
                    }
                }

                if (component != null) {
                    rowSelect.add(component);
                }
            }
        }
    }

    public String initSql(JSONObject conf, List<SqlHelper> sqlHelpers, long timeStamp,
                          String engineName) {
        this.engineName = engineName;
        String ret = StringUtil.EMPTY_STRING;
        JSONArray input = conf.getJSONArray("input");
        String table = input.getJSONObject(0).getString("tableName");
        table = ToolUtil.alignTableName(table, timeStamp);
        JSONArray output = conf.getJSONArray("output");
        String outTable = output.getJSONObject(0).getString("tableName") + timeStamp;
        if (CollectionUtils.isEmpty(columnSelect)) {
            ret = String.format(FILTER_SQL, outTable, "*", table);
        } else {
            ret = String.format(FILTER_SQL, outTable, Joiner.on(", ").join(columnSelect), table);
        }
        if (sqlHelpers == null || sqlHelpers.size() == 0) {
            return null;
        }
        String where = sqlHelpers.get(0).assembleOrFilter(rowSelect);

        for (Object tmpFilter : rowSelect) {
            if (tmpFilter instanceof DimensionConfig) {
                DimensionConfig filter = (DimensionConfig) tmpFilter;
                String col = filter.getFieldName();
                String value = filter.getValues().get(0);
                if (!filter.getFilterType().equals("=") && DatasetUtil.isTime(value)) {
                    String formatCol = String.format("pipeline.sys_func_format_time(\"%s\"::varchar, '%s')", col,
                            PythonDateTypeFormatEnum.FORMAT_0.getVal());
                    where = where.replaceAll(SqlUtil.formatPGSqlColName(col), formatCol);
                }
            } else if (tmpFilter instanceof CompositeConfig) {
                // 暂时不需要处理
            }
        }

        if (StringUtils.isNotEmpty(where)) {
            ret = ret + " " + where;
        }

        return ret;
    }

    /**
     * 定义每个算子的输出表格式
     *
     * @param vo
     * @return
     */
    public void defineOutput(TaskVO vo) {
        JSONArray jsonArray = new JSONArray();
        JSONObject jsonObject = vo.getData();
        List<String> columnsFilter = jsonObject.getJSONArray("columnsFilter") != null ?
                jsonObject.getJSONArray("columnsFilter").toJavaList(String.class) : null;
        String tableName = String
                .format(SqlTemplate.VIEW_TABLE_NAME, vo.getPipelineId(), vo.getId());
        JSONObject item = new JSONObject();
        JSONArray outputColTypes = new JSONArray();
        JSONArray input = jsonObject.getJSONArray("input");
        if (input != null && input.size() > 0) {
            List<String> colNames = input.getJSONObject(0).getJSONArray("tableCols")
                    .toJavaList(String.class);
            List<String> colTypes = input.getJSONObject(0).getJSONArray("columnTypes")
                    .toJavaList(String.class);
            if (columnsFilter != null && columnsFilter.size() > 0) {
                this.prepareOutputColumnTypes(outputColTypes, columnsFilter, colNames, colTypes);
                jsonObject.put("columnsFilter", columnsFilter);
            } else {
                // first load, we need select all items
                columnsFilter = new ArrayList<>();
                columnsFilter.addAll(colNames);
                outputColTypes.addAll(colTypes);
                jsonObject.put("columnsFilter", columnsFilter);
            }
        }
        JSONObject numberFormat = input.getJSONObject(0).getJSONObject("numberFormat");
        if (numberFormat != null && columnsFilter != null && !columnsFilter.isEmpty()) {
            Set<String> cols = numberFormat.keySet();
            for (String col : cols) {
                if (!columnsFilter.contains(col)) {
                    numberFormat.remove(col);
                }
            }

        }
        item.put("numberFormat", numberFormat);
        item.put("tableName", tableName);
        item.put("tableCols", columnsFilter);
        item.put("nodeName", vo.getName() == null ? ETLEnum.FILTER.toString() : vo.getName());
        item.put("columnTypes", outputColTypes);
        this.setSubTypeForOutput(item);
        jsonArray.add(item);
        jsonObject.put("output", jsonArray);
        vo.setData(jsonObject);
    }

    public void initTemplate(JSONObject data) {
        data.put("columnsFilter", new JSONArray());
        data.put("rowsFilter", new JSONArray());
        baseInitTemplate(data);
    }
}
